package org.springblade.thingsphere.network.components.mqtt.client;

import cn.hutool.core.util.StrUtil;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.mqtt.MqttClient;
import io.vertx.mqtt.MqttClientOptions;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springblade.thingsphere.CompositeProtocolSupport;
import org.springblade.thingsphere.domain.Message;
import org.springblade.thingsphere.manage.pojo.entity.GatewayEntity;
import org.springblade.thingsphere.manage.service.IGatewayService;
import org.springblade.thingsphere.message.DeviceMessageCodec;
import org.springblade.thingsphere.network.components.NetworkProvider;
import org.springblade.thingsphere.network.components.NetworkType;
import org.springblade.thingsphere.network.components.bus.event.MqttClientMsgEvent;
import org.springblade.thingsphere.network.components.common.MqttClientConfig;
import org.springblade.thingsphere.network.protocol.impl.ProtocolAssetSupplier;
import org.springblade.thingsphere.type.DefaultTransport;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.stereotype.Component;

import java.nio.charset.Charset;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
 * @author lhb
 * @date 2024/9/26 下午12:22
 */
@Slf4j
@Component
public class VertxMqttClient implements NetworkProvider<MqttClientConfig, MqttClient> {

	@Resource
	private ApplicationEventPublisher applicationEventPublisher;
	@Resource
	private IGatewayService gatewayService;
	@Resource
	private ProtocolAssetSupplier protocolAssetSupplier;
	@Resource
	private Vertx vertx;

	private final static Map<Long, MqttClient> mqttClientMap = new ConcurrentHashMap<>();

	@Override
	public NetworkType getType() {
		return NetworkType.MQTT_CLIENT;
	}

	@Override
	public MqttClient createNetwork(MqttClientConfig config) {

		MqttClient item = mqttClientMap.remove(config.getId());
		if (null != item) {
			item.disconnect();
		}
		MqttClientOptions options = new MqttClientOptions();
		options.setAutoKeepAlive(true);
		options.setClientId(config.getClientId());
		options.setCleanSession(true);
		if (StrUtil.isNotBlank(config.getUserName())) {
			options.setUsername(config.getUserName());
		}
		if (StrUtil.isNotBlank(config.getPassword())) {
			options.setPassword(config.getPassword());
		}

		options.setWillQoS(0);
		MqttClient mqttClient = MqttClient.create(vertx, options);

		mqttClient.connect(config.getPort(), config.getHost(), handler -> {
			if (handler.succeeded()) {
				log.info("mqtt client connected");
				String[] split = config.getReportPropertyPath().split(",");
				for (String topic : split) {
					mqttClient.subscribe(topic, 0, subHandler -> {
						if (subHandler.succeeded()) {
							log.info(subHandler.result().toString());
						} else {
							log.error("Subscription failure:{}", subHandler.cause().getMessage());
						}
					});
				}
			} else {
				log.error("Connection failure:{}", handler.cause().getMessage());
			}
		});

		// 接收订阅的消息
		mqttClient.publishHandler(publish -> {
			try {
				String topic = publish.topicName();
				Buffer payload = publish.payload();
				log.info("topic:{} publish.messageId():{} payload:{}", topic, publish.messageId(), payload.toString());

				// 查询网络组件对应的网关
				GatewayEntity gateway = gatewayService.dictByNetworkId(config.getId());
				if (null == gateway || 0 == gateway.getStatus()) {
					log.info("The corresponding gateway was not found");
					return;
				}
				// 调用协议包解析请求的内容
				CompositeProtocolSupport compositeProtocolSupport = protocolAssetSupplier.getProtocolSupportById(gateway.getProtocolId());
				if (null == compositeProtocolSupport) {
					log.info("mqtt protocol package handler not found");
					return;
				}
				DeviceMessageCodec deviceMessageCodec = compositeProtocolSupport.getMessageCodecSupports().get(DefaultTransport.MQTT.getId());
				if (null == deviceMessageCodec) {
					log.info("mqtt codec is not implemented in the protocol package");
					return;
				}
				List<Message> decoder = deviceMessageCodec.decoder(payload.toString(Charset.defaultCharset()));
				// 消息存储到bus
				applicationEventPublisher.publishEvent(new MqttClientMsgEvent<>(this, decoder));
			} catch (Exception e) {
				log.error(e.getMessage(), e);
			}
		});
		mqttClientMap.put(config.getId(), mqttClient);
		return mqttClient;
	}

	@Override
	public void shutdown(Long id) {
		MqttClient mqttClient = mqttClientMap.remove(id);
		if (null != mqttClient) {
			mqttClient.disconnect();
		}
	}
}
